﻿using FreeOPC.Config;
using FreeOPC.Server.OPCServer;
using Microsoft.Extensions.Hosting;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using Serilog;
using Serilog.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;


namespace FreeOPC.Server.MQTTService
{
    public class MqttService : BackgroundService
    {
        /// <summary>
        /// The logger.
        /// </summary>
        private readonly ILogger logger;

        /// <summary>
        /// The service name.
        /// </summary>
        private readonly string serviceName;


        public static MqttServer mqttServer { get; set; }

        /// <summary>
        /// The bytes divider. (Used to convert from bytes to kilobytes and so on).
        /// </summary>
        private static double BytesDivider => 1048576.0;

        /// <summary>
        /// Gets or sets the MQTT service configuration.
        /// </summary>
        public MqttConfiguration MqttServiceConfiguration { get; set; }

        /// <summary>
        /// Initializes a new instance of the <see cref="MqttService"/> class.
        /// </summary>
        /// <param name="mqttServiceConfiguration">The MQTT service configuration.</param>
        /// <param name="serviceName">The service name.</param>
        public MqttService(MqttConfiguration mqttServiceConfiguration, string serviceName)
        {
            this.MqttServiceConfiguration = mqttServiceConfiguration;
            this.serviceName = serviceName;

            // Create the logger.
            this.logger = LoggerConfig.GetLoggerConfiguration(nameof(MqttService))
                .WriteTo.Sink((ILogEventSink)Log.Logger)
                .CreateLogger();
        }

        /// <inheritdoc cref="BackgroundService"/>
        public override async Task StartAsync(CancellationToken cancellationToken)
        {
            if (!this.MqttServiceConfiguration.IsValid())
            {
                throw new Exception("The configuration is invalid");
            }

            this.logger.Information("Starting service");
            this.StartMqttServer();
            OPCStartService.InitOPCServer();
            this.logger.Information("Service started");
            SerilogServer.WriteLog("FreeOPCSever", new string[] { "服务OPC初始化结束" });
            await base.StartAsync(cancellationToken);
            SerilogServer.WriteLog("FreeOPCSever", new string[] { "StartAsync end" });
        }

        /// <inheritdoc cref="BackgroundService"/>
        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            await base.StopAsync(cancellationToken);
        }

        /// <inheritdoc cref="BackgroundService"/>
        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    // Log some memory information.
                    this.LogMemoryInformation();
                    await Task.Delay(this.MqttServiceConfiguration.DelayInMilliSeconds, cancellationToken);
                }
                catch (Exception ex)
                {
                    this.logger.Error("An error occurred: {Exception}", ex);
                }
            }
        }

        /// <summary>
        /// Validates the MQTT connection.
        /// </summary>
        /// <param name="args">The arguments.</param>
        public Task ValidateConnectionAsync(ValidatingConnectionEventArgs args)
        {
            try
            {
                var currentUser = this.MqttServiceConfiguration.Users.FirstOrDefault(u => u.UserName == args.UserName);

                if (currentUser == null)
                {
                    args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    this.LogMessage(args, true);
                    return Task.CompletedTask;
                }

                if (args.UserName != currentUser.UserName)
                {
                    args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    this.LogMessage(args, true);
                    return Task.CompletedTask;
                }

                if (args.Password != currentUser.Password)
                {
                    args.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
                    this.LogMessage(args, true);
                    return Task.CompletedTask;
                }

                args.ReasonCode = MqttConnectReasonCode.Success;
                this.LogMessage(args, false);
                return Task.CompletedTask;
            }
            catch (Exception ex)
            {
                this.logger.Error("An error occurred: {Exception}.", ex);
                return Task.FromException(ex);
            }
        }

        /// <summary>
        /// Validates the MQTT subscriptions.
        /// </summary>
        /// <param name="args">The arguments.</param>
        public Task InterceptSubscriptionAsync(InterceptingSubscriptionEventArgs args)
        {
            try
            {
                
                args.ProcessSubscription = true;
                this.LogMessage(args, true);
                return Task.CompletedTask;
            }
            catch (Exception ex)
            {
                this.logger.Error("An error occurred: {Exception}.", ex);
                return Task.FromException(ex);
            }
        }

        /// <summary>
        /// Validates the MQTT application messages.
        /// </summary>
        /// <param name="args">The arguments.</param>
        public Task InterceptApplicationMessagePublishAsync(InterceptingPublishEventArgs args)
        {
            try
            {
                args.ProcessPublish = true;
                this.LogMessage(args);
                return Task.CompletedTask;
            }
            catch (Exception ex)
            {
                this.logger.Error("An error occurred: {Exception}.", ex);
                return Task.FromException(ex);
            }
        }

        /// <summary>
        /// Starts the MQTT server.
        /// </summary>
        private void StartMqttServer()
        {
            var optionsBuilder = new MqttServerOptionsBuilder()
                .WithDefaultEndpoint()
                .WithDefaultEndpointPort(this.MqttServiceConfiguration.Port)
                .WithEncryptedEndpointPort(this.MqttServiceConfiguration.TlsPort);

            mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
            mqttServer.ValidatingConnectionAsync += this.ValidateConnectionAsync;
            mqttServer.InterceptingSubscriptionAsync += this.InterceptSubscriptionAsync;
            mqttServer.InterceptingPublishAsync += this.InterceptApplicationMessagePublishAsync;
            mqttServer.StartAsync(); 
        }

        public void TestPublish(string id,string msg)
        {
            Console.WriteLine(id);
            var message = new MqttApplicationMessageBuilder().WithTopic("HelloWorld").WithPayload(msg).Build();
            var client= mqttServer.GetClientsAsync();
            // Now inject the new message at the broker.
            mqttServer.InjectApplicationMessage(
               new InjectedMqttApplicationMessage(message)
               {
                   SenderClientId = id
               });
        }


        


        /// <summary> 
        ///     Logs the message from the MQTT subscription interceptor context. 
        /// </summary> 
        /// <param name="args">The arguments.</param>
        /// <param name="successful">A <see cref="bool"/> value indicating whether the subscription was successful or not.</param>
        private void LogMessage(InterceptingSubscriptionEventArgs args, bool successful)
        {
#pragma warning disable Serilog004 // Constant MessageTemplate verifier
            this.logger.Information(
                successful
                    ? "New subscription: ClientId = {ClientId}, TopicFilter = {TopicFilter}"
                    : "Subscription failed for clientId = {ClientId}, TopicFilter = {TopicFilter}",
                args.ClientId,
                args.TopicFilter);
#pragma warning restore Serilog004 // Constant MessageTemplate verifier
        }

        /// <summary>
        ///     Logs the message from the MQTT message interceptor context.
        /// </summary>
        /// <param name="args">The arguments.</param>
        private void LogMessage(InterceptingPublishEventArgs args)
        {
            var payload = args.ApplicationMessage?.Payload is null ? null : Encoding.UTF8.GetString(args.ApplicationMessage.Payload);

            this.logger.Information(
                "Message: ClientId = {ClientId}, Topic = {Topic}, Payload = {Payload}, QoS = {Qos}, Retain-Flag = {RetainFlag}",
                args.ClientId,
                args.ApplicationMessage?.Topic,
                payload,
                args.ApplicationMessage?.QualityOfServiceLevel,
                args.ApplicationMessage?.Retain);
        }

        /// <summary> 
        ///     Logs the message from the MQTT connection validation context. 
        /// </summary> 
        /// <param name="args">The arguments.</param>
        /// <param name="showPassword">A <see cref="bool"/> value indicating whether the password is written to the log or not.</param>
        private void LogMessage(ValidatingConnectionEventArgs args, bool showPassword)
        {
            if (showPassword)
            {
                this.logger.Information(
                    "New connection: ClientId = {ClientId}, Endpoint = {Endpoint}, Username = {UserName}, Password = {Password}, CleanSession = {CleanSession}",
                    args.ClientId,
                    args.Endpoint,
                    args.UserName,
                    args.Password,
                    args.CleanSession);
            }
            else
            {
                this.logger.Information(
                    "New connection: ClientId = {ClientId}, Endpoint = {Endpoint}, Username = {UserName}, CleanSession = {CleanSession}",
                    args.ClientId,
                    args.Endpoint,
                    args.UserName,
                    args.CleanSession);
            }
        }

        /// <summary>
        /// Logs the heartbeat message with some memory information.
        /// </summary>
        private void LogMemoryInformation()
        {
            var totalMemory = GC.GetTotalMemory(false);
            var memoryInfo = GC.GetGCMemoryInfo();
            var divider = BytesDivider;
            Log.Information(
                "Heartbeat for service {ServiceName}: Total {Total}, heap size: {HeapSize}, memory load: {MemoryLoad}.",
                this.serviceName, $"{(totalMemory / divider):N3}", $"{(memoryInfo.HeapSizeBytes / divider):N3}", $"{(memoryInfo.MemoryLoadBytes / divider):N3}");
        }
    }
}
